package com.dc.schedule.client.socket;

import com.dc.schedule.api.base.WaitResponseLock;
import com.dc.schedule.api.exception.ConnectionTimeoutException;
import com.dc.schedule.api.model.SocketMessage;
import com.dc.schedule.client.ScheduleEventListener;
import com.dc.schedule.client.concurrent.DefaultThreadFactory;
import com.dc.schedule.client.convertor.ClientMessageConvertor;
import com.dc.schedule.client.convertor.JacksonClientMessageConvertorImpl;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Slf4j
public class SimpleScheduleSocketClient implements ScheduleSocketClient {
    @Setter
    private ClientMessageConvertor clientMessageConvertor = new JacksonClientMessageConvertorImpl();
    private ScheduleEventListener scheduleEventListener;
    private List<InetSocketAddress> endpoints;
    private Socket socket;
    private BufferedReader socketReader;
    private BufferedWriter socketWriter;
    @Setter
    private int connectTimeoutSecond = 3;
    
    private boolean connected = false;
    private boolean afterConnected =false;
    
    private volatile boolean running = false;
    private final Map<String, WaitResponseLock<SocketMessage>> responseWaitMap = new ConcurrentHashMap<>(3);
    
    private final Lock lock = new ReentrantLock();
    private final Condition connectionCondition = lock.newCondition();
    private final Condition afterConnectedCondition = lock.newCondition();
    
    private final Random random = new Random();
    
    private final ExecutorService eventLoopPool = new ThreadPoolExecutor(1, 3,
                                                                         0L, TimeUnit.MILLISECONDS,
                                                                         new SynchronousQueue<>(),
                                                                         new DefaultThreadFactory(
                                                                                 "sc-event",
                                                                                 true
                                                                         )
    );
    
    
    @Override
    public void setEndpoints(List<InetSocketAddress> endpoints) {
        this.endpoints = endpoints;
    }
    
    @Override
    public void setScheduleEventListener(ScheduleEventListener listener) {
        this.scheduleEventListener = listener;
    }
    
    @Override
    public void sendMessage(SocketMessage socketMessage) throws IOException, InterruptedException {
        if (socketMessage.getRequestUid() == null) {
            socketMessage.setRequestUid(UUID.randomUUID().toString());
        }
        final String message = clientMessageConvertor.encode(socketMessage);
        lock.lock();
        try {
            while (!connected) {
                final boolean connect = connectionCondition.await(connectTimeoutSecond, TimeUnit.SECONDS);
                if (!connect) {
                    throw new ConnectionTimeoutException("连接超时");
                }
            }
            socketWriter.write(message);
            socketWriter.flush();
        } catch (IOException e) {
            this.disconnect();
            scheduleEventListener.onDisconnect();
            throw e;
        } finally {
            lock.unlock();
        }
    }
    
    @Override
    public SocketMessage sendMessageForResponse(SocketMessage socketMessage) throws IOException, InterruptedException {
        socketMessage.setRequestUid(UUID.randomUUID().toString());
        final WaitResponseLock<SocketMessage> response = new WaitResponseLock<>();
        responseWaitMap.put(socketMessage.getRequestUid(), response);
        final String message = clientMessageConvertor.encode(socketMessage);
        lock.lock();
        try {
            while (!connected) {
                final boolean connect = connectionCondition.await(connectTimeoutSecond, TimeUnit.SECONDS);
                if (!connect) {
                    throw new ConnectionTimeoutException("连接超时");
                }
            }
            socketWriter.write(message);
            socketWriter.flush();
            return response.getResponse();
        } catch (IOException e) {
            this.disconnect();
            scheduleEventListener.onDisconnect();
            throw e;
        } finally {
            lock.unlock();
            responseWaitMap.remove(socketMessage.getRequestUid());
        }
    }
    
    private void eventLoop() {
        try {
            while (running && connected) {
                final String json = socketReader.readLine();
                if (json != null) {
                    final SocketMessage message = clientMessageConvertor.decode(json);
                    final WaitResponseLock<SocketMessage> waitResponseLock;
                    if (message.getRequestUid() != null &&
                                (waitResponseLock = responseWaitMap.remove(message.getRequestUid())) != null) {
                        waitResponseLock.setResponse(message);
                    } else {
                        scheduleEventListener.onMessage(message);
                    }
                }
            }
        } catch (Exception e) {
            if (running) {
                log.error("接收发生异常， 连接断开", e);
            }
            this.disconnect();
            scheduleEventListener.onDisconnect();
        }
        if (running) {
            eventLoopPool.execute(this::connect);
        }
    }
    
    public void connect() {
        while (!connected && running) {
            final InetSocketAddress address = endpoints.get(random.nextInt(endpoints.size()));
            lock.lock();
            try {
                log.info("尝试连接到 {}", address);
                socket = new Socket();
                socket.setKeepAlive(true);
                socket.connect(address);
                final InputStream inputStream = socket.getInputStream();
                final OutputStream outputStream = socket.getOutputStream();
                this.socketReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
                this.socketWriter = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
                connected = true;
                connectionCondition.signalAll();
                eventLoopPool.execute(this::eventLoop);
                scheduleEventListener.onConnect();
                afterConnected = true;
                afterConnectedCondition.signalAll();
                log.info("连接成功 {}", address);
                break;
            } catch (Exception e) {
                log.warn("连接失败:" + address, e);
                disconnect();
            } finally {
                lock.unlock();
            }
            try {
                Thread.sleep(800L);
            } catch (InterruptedException ignored) {
        
            }
        }
    }
    
    @Override
    public void start() {
        Objects.requireNonNull(endpoints, "endpoints 为空， 请正确配置参数");
        running = true;
        lock.lock();
        try {
            eventLoopPool.execute(this::connect);
            while (!afterConnected) {
                final boolean awake = afterConnectedCondition.await(connectTimeoutSecond, TimeUnit.SECONDS);
                if (!awake) {
                    throw new IllegalStateException("连接未成功, 等待时间: " + connectTimeoutSecond + "S");
                }
            }
        } catch (InterruptedException e) {
            log.warn("线程中断", e);
        } finally {
            lock.unlock();
        }
    }
    
    @Override
    public void disconnect() {
        connected = false;
        afterConnected = false;
        if (socket != null && !socket.isClosed()) {
            try {
                socket.close();
            } catch (IOException e) {
                log.warn("关闭连接失败", e);
            }
        }
    }
    
    @Override
    public synchronized void shutdown() {
        if (!running && eventLoopPool.isShutdown()) {
            return;
        }
        log.info("关闭客户端中...");
        running = false;
        disconnect();
        eventLoopPool.shutdown();
        try {
            while (true) {
                if (eventLoopPool.awaitTermination(connectTimeoutSecond, TimeUnit.SECONDS)) {
                    break;
                }
            }
        } catch (InterruptedException ignored) {
        
        }
        log.info("关闭客户端成功...");
    }
    
    
}
